package com.gitee.jastee.kafka.Interceptor;

import cn.hutool.core.io.FileUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

/**
 * 消费者拦截器
 * @Author jast
 * @Date 2020/4/19 下午2:09
 * @Version 1.0
 */
public class JastConsumerInterceptor implements ConsumerInterceptor<String,String> {

    int i =  0;
    /**
     * 该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前，拦截器会先拦一道，搞一些事情，之后再返回给你。
     * @param consumerRecords
     * @return
     */
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        for(ConsumerRecord<String,String> record : consumerRecords){
            String value = record.value();
            JSONObject jsonObject = JSONUtil.parseObj(value);
            Long producer_time = jsonObject.getLong("producer_time");
            String content = "从发送到消费消耗了: "+(System.currentTimeMillis()-producer_time)+" ms" + "  "+(++i);
            System.out.println(content);
//            FileUtil.appendString(content+"\n","/home/jast/software/IdeaProjects/kafka-tool/kafka-util-collection/1.txt","utf-8");
//            System.out.println()  ;
        }
        return consumerRecords;
    }

    /**
     * Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作，比如打日志等。
     * @param map
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {

    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}
